package org.msgcenter.mc.actor;

import cn.hutool.core.lang.TypeReference;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpStatus;
import cn.hutool.json.JSONUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import lombok.extern.slf4j.Slf4j;
import org.msgcenter.mc.common.JsonResult;
import org.msgcenter.mc.ds.SiteMsg;
import org.msgcenter.mc.ds.TurnOnOffMsg;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

// 站内信推送
@Slf4j
public class MsgToSiteActor extends AbstractVerticle {

  private EventBus eb;

  // 推送开关
  private boolean pushOnOff = true;

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    super.start(startPromise);

    eb = vertx.eventBus();

    onPushSiteMsg();
    onSendTurnOff();
  }


  // 接受到推送消息
  private void onPushSiteMsg() {
    MessageConsumer<String> consumer = eb.consumer("service.push_msg.site_msg");

    BiConsumer<String, SiteMsg> errHandler = (errLog, sm) -> {
      JsonResult<String, SiteMsg> jr = new JsonResult<>(HttpStatus.HTTP_INTERNAL_ERROR, errLog, sm);
      eb.publish("service.push_msg.site_msg.err", JSONUtil.toJsonStr(jr));
    };

    Consumer<SiteMsg> okHandler = sm -> {
      JsonResult<String, SiteMsg> jr = new JsonResult<>(HttpStatus.HTTP_OK, "", sm);
      eb.publish("service.push_msg.site_msg.ok", JSONUtil.toJsonStr(jr));
    };

    consumer.handler(payload -> {
        SiteMsg msg = JSONUtil.toBean(payload.body(), new TypeReference<>() {
        }, true);

        // 对于站点消息 保存到相关数据表即可
        if (pushOnOff) {
          log.info("发送站点消息到数据库 {}", msg);
          eb.request("db.post.add_site_msg", JSONUtil.toJsonStr(msg))
            .onSuccess(ok -> okHandler.accept(msg))
            .onFailure(err -> errHandler.accept(StrUtil.format("发送站点消息出现错误 {}", err.getMessage()), msg));

        } else {
          String errLog = StrUtil.format("发送站点消息开关关闭 暂不发送消息 {}", msg);
          log.info(errLog);

          errHandler.accept(errLog, msg);
        }
      })
      .exceptionHandler(err -> log.error("发送站点消息出现错误 {}", err.getMessage()));
  }

  // 开关执行动作
  private void onSendTurnOff() {
    MessageConsumer<String> consumer = eb.consumer("service.on_off.site_msg");

    consumer.handler(payload -> {
      TurnOnOffMsg msg = JSONUtil.toBean(payload.body(), new TypeReference<>() {
      }, true);

      log.info("站点消息执行开关操作 : {}", msg);
      pushOnOff = msg.getStatus();
    });
  }

  @Override
  public void stop(Promise<Void> stopPromise) throws Exception {
    super.stop(stopPromise);
  }
}
